/*
 * Copyright (c) 2012 MCRI, authors
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use,
 * copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the
 * Software is furnished to do so, subject to the following
 * conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
 * THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */
package bpipe

import java.util.Collections.SynchronizedList
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import groovy.transform.CompileStatic
import groovy.util.logging.Log;

class ResourceRequest implements Serializable {
    
    public static final long serialVersionUID = 0L
    
    /**
     * The amount of the resource to be requested
     * <p>
     * In this attribute, the maxAmount of zero is interpreted as
     * requesting an unlimited amount up to the default max_threads_per_command 
     * value which is set in the default / user bpipe.config
     */
    ResourceUnit resource
    
    /**
     * The amount that was allocated.
     * <p>
     * In this attribute, the maxAmount is set to contain the actual maximum that was
     * derived after considering configuration from bpipe.config, pipeline stage and 
     * defaults.
     */
    ResourceUnit allocated
}

/**
 * Manages concurrency for parallel pipelines.
 * <p>
 * This class is responsible for managing a thread pool that is configured
 * with size according to the maximum concurrency specified by the user
 * on the command line (-n option), and it also handles execution of groups
 * of tasks as a unit so that they can all be entered into the common thread
 * pool and the flow returns only when they all are completed.
 * <p>
 * There are two layers of concurrency management implemented. The first is the
 * raw capacity of the thread pool. This ensures that absolute concurrency within
 * Bpipe can't exceed the user's configuration.  However there is a second, logical
 * level of concurrency that is enforced on top of that, using a global semaphore
 * that is acquired/released as each parallel segment runs. The purpose of this
 * second "logical" level is that it allows a user to reserve more than n=1 concurrency
 * for a single thread if that thread will create particularly heavy load. The 
 * obvious situation where that happens is if the thread itself launches child threads
 * that are outside of Bpipe's control, or if it runs (shell) commands that themeselves
 * launch multiple threads. In these cases the "logical" concurrency control can 
 * be used to restrict the actual concurrency below that enforced by the physical
 * thread pool to manage the actual load generated by the pipeline.
 */
@Singleton
@Log
class Concurrency {
    
    /**
     * The maximum time a resource request will be kept waiting before distributing resources.
     * The purpose of this period is to allow for fairer scheduling of resources.
     */
    public final static long AUCTION_TIMEOUT_MS = 5000L
    
    /**
     * When waiting for an auction to start, resource requestors will periodically poll
     * to check if they have timed out yet with this frequency (see #AUCTION_TIMEOUT_MS)
     */
    public final static long AUCTION_ATTENDEE_POLL_PERIOD = 2000L
    
    /**
     * The thread pools to use for executing tasks. Pools are organised into tiers,
     * where dependent threads *must* be placed in different tiers, to ensure there
     * cannot be a possibility of deadlock.
     */
    List<ThreadPoolExecutor> pools = Collections.synchronizedList([initPool()])
    
    /**
     * Each resource allocation allocates resources for its resource type against
     * these resource allocations.
     */
    Map<String,Semaphore> resourceAllocations = initResourceAllocations()    
    
    /**
     * List of known resource requestors
     */
    List<ResourceRequestor> registeredResourceRequestors = Collections.synchronizedList([])
    
    /**
     * The current outstanding list of resource requests waiting for a resource auction to start
     * <p>
     * NOTE: in the future this needs to become a per-resource list. Right now, it only
     * contains requests for thread resources.
     */
    List<ResourceRequest> resourceRequests = Collections.synchronizedList([])
	
    /**
     * Counts of threads running
     */
    Map<Runnable,AtomicInteger> counts = [:]
    
    /**
     * Create a thread pool for use by this class, based on the maximum concurrency 
     * configured by the user.
     * 
     * @return  A ThreadPoolExecutor for executing jobs
     */
    ThreadPoolExecutor initPool(int numThreads=-1) {
        
        if(numThreads < 0)
            numThreads = Config.config.maxThreads*2 
        
        log.info "Creating thread pool with " + numThreads + " threads to execute parallel pipelines"
        
        ThreadFactory threadFactory = { Runnable r ->
                          def t = new Thread(r)  
                          t.setDaemon(true)
                          return t
                        } as ThreadFactory
        
        // LinkedBlockingQueue vs SynchronousBlockingQueue vs ArrayBlockingQueue?
        //
        //    - SynchronousBlockingQueue holds zero elements. That means if no thread in 
        //      the pool, it creates a new one, corePoolSize is IGNORED
        //            
        //    - LinkedBlockingQueue : a queue with infinite capacity, means that if
        //      no thread available will wait until one available. maxPoolSize ignored,
        //      only uses corePoolSize
        //     
        //    - ArrayBlockingQueue : a queue with fixed size. Will throw error when
        //     number of items exceeds capcacity
        //
        // In practise, observe that SynchronousQueue allows unlimited simultaneous
        // threads, it essentially disables the queueing and makes it so that any 
        // overflow from the pool results in a new thread being created.
        // 
        return new ThreadPoolExecutor(numThreads, Integer.MAX_VALUE,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(), 
//                                      new SynchronousQueue<Runnable>(), 
//                                      new ArrayBlockingQueue<Runnable>(Config.config.maxThreads), 
                                      threadFactory) {
              @Override
              void afterExecute(Runnable r, Throwable t) {
                  AtomicInteger runningCount
                  synchronized(counts) {
                    runningCount = counts[r]
                  }
                  
                  int value = runningCount.decrementAndGet()
                  
                  log.info "Decremented running count to $value in thread " + Thread.currentThread().id
                  
                  // Notify parent that will be waiting on this count
                  // for each decrement
                  synchronized(runningCount) {
                      runningCount.notify()
                  }
              }
        }
    }
        

    Map initResourceAllocations() {

        Map res = [ threads: new Semaphore(Config.config.maxThreads)]

        if(Config.userConfig?.maxMemoryMB) {
            res["memory"] = new Semaphore(Integer.parseInt(Config.userConfig.maxMemoryMB))
        }               
        
        if(Config.config.maxMemoryMB) {
            log.info "Setting maximum memory to $Config.config.maxMemoryMB from configuration / command line"
            res["memory"] = new Semaphore(Config.config.maxMemoryMB)
        }               
		return res
    }
    
    /**
     * Execute the given list of runnables using the specified thread pool. Thread pools are maintained in
     * tiers, which are designed to separate threads which have dependencies (hence raising the
     * possibility of deadlocks). Each nested level of the pipeline runs in a separate "tier".
     * <p>
     * This method waits for all the runnables in the given list to finish before returning.
     * 
     * @param runnables
     */
    void execute(List<Runnable> runnables, int tier=0) {
        
        synchronized(this.pools) {
            while(this.pools.size()<=tier) {
                this.pools.add(initPool(Config.config.maxThreads+1))
            }
        }
        
        ThreadPoolExecutor pool = pools[tier]
        
        AtomicInteger runningCount = new AtomicInteger()
        
        // First set up the count of running pipelines
        for(Runnable r in runnables) {
            synchronized(counts) {
                runningCount.incrementAndGet()
                counts[r] = runningCount
            }
        }
        
        // Now put them in the thread pool
        for(Runnable r in runnables) {
            pool.execute(r); 
        }
            
        // Wait until the count of running threads reaches zero.
        // The count is decremented by the ThreadPoolExecutor#afterExecute
        // call as each thread finishes
        long lastLogTimeMillis = 0
        while(runningCount.get()) {
                
            if(lastLogTimeMillis < System.currentTimeMillis() - 5000) {
                log.info("Waiting for " + runningCount.get() + " parallel stages to complete (pool.active=${pool.activeCount} pool.tasks=${pool.taskCount})" )
                lastLogTimeMillis = System.currentTimeMillis()
            }
                    
            synchronized(runningCount) {
                runningCount.wait(50)
            }
                
            if(runningCount.get())
                Thread.sleep(300)
        }
    }
    
   /**
    * Register the given requestor as a participant in auctions
    * <p>
    * The purpose of this "registration" is to increase the fairness of auctions
    * for resources. Auctions try to wait for all potential "bidders" to show
    * up and put in a bid for resources. Every pipeline segment registers
    * as a "bidder" using this method, so that auctions will wait for them.
    * The purpose of waiting for "bidders" to arrive is to enable a fair scheduling
    * of resources instead of first-come-first-serve which leads to a greedy allocation
    * of resources where the first to arrive gets over allocated resources.
    * <p>
    * Note that when a command is executing, the requestor remains registered,
    * but says they are not 'bidding' via the isBidding() method.
    * <p>
    * A requestor should deregister themselves via the unregisterResourceRequestor 
    * method when they are finished. They should signal then are not bidding when not
    * active (eg: when waiting for a command to finish).
    * 
    * @param requestor
    */
   void registerResourceRequestor(ResourceRequestor requestor) {
       log.info "Register resource requestor $requestor"
       synchronized(resourceRequests) {
           registeredResourceRequestors.add(requestor)
       }
   }
   
   void unregisterResourceRequestor(ResourceRequestor requestor) {
       log.info "Unregister resource requestor $requestor"
       synchronized(resourceRequests) {
           registeredResourceRequestors.remove(requestor)
       }
   }

   /**
    * Called by parallel paths before they begin execution: enforces overall concurrency by blocking
    * the thread before it can start work. (ie. this method may block).
    */
   void acquire(ResourceUnit resourceUnit) {
        Semaphore resource
        synchronized(resourceAllocations) {
            resource = resourceAllocations.get(resourceUnit.key)
        }
        
        if(resource == null) {
            log.info "Unknown resource type $resourceUnit.key specified: treating as infinite resource"
            return
        }
        
       int amount = resourceUnit.amount
        
       log.info "Thread " + Thread.currentThread().id + 
           " requesting for $amount concurrency permit(s) type $resourceUnit.key with " + resource.availablePermits() + " available"
           
       long startTimeMs = System.currentTimeMillis()
       
       if(resourceUnit.key == "threads") {
           amount = negotiateDynamicResources(resourceUnit, resource)
       }
           
       resource.acquire(amount)
       resourceUnit.amount = amount
       
       long durationMs = startTimeMs - System.currentTimeMillis()
       if(durationMs > 1000) {
           log.info "Thread " + Thread.currentThread().id + " blocked for $durationMs ms waiting for resource $resourceUnit.key amount(s) $amount"
       }
       else
           log.info "Thread " + Thread.currentThread().id + " acquired resource $resourceUnit.key in amount $amount"
   }

   /**
    * Attempt to share the given resource fairly among all requestors that are bidding
    * for the same resource.
    * <p>
    * At the moment, this is only implemented for threads to support the dynamic 
    * "$threads" variable. The implementation blocks ALL requests for thread resources
    * prior to the actual resource acquisition, to adjust their requests. The result is that
    * acquisition takes place in two phases:
    * 
    * <li> All the currently active segments put in their "bids" for resources by 
    *      entering this method
    * <li> They are blocked until every active segment has done this. Note that "active"
    *      means that #Pipeline.isIdle is set to false.
    * <li> When all active segments have made their bid, the bids are resolved. This is 
    *      done by the last thread to enter this method, which as the last entrant, 
    *      assumes the role of dividing up the available resources among the "bidders".
    * <li> The allocated amount is returned and replaces the amount actually used
    *      in the actual resource acquisition phase
    * 
    * Note that in the future this will become a generic mechanism, so everything in here
    * is handled generically. However some limitations mean that for now we can't apply this
    * to more than one resource (specifically, dividing up resources obviously has to be done
    * "per resource". The current implementation only works because <i>every</i> command has
    * to allocate a thread. Optional resources will have to have a more sophisticated 
    * per-resource handling.
    * 
    * @param resourceUnit   the amount of the resource requested
    * @param resource       the semaphore that controls access to the resource - used to 
    *                       estimate current available resources.
    * @return               the actual amount of resources allocated
    */
    private int negotiateDynamicResources(ResourceUnit resourceUnit, Semaphore resource) {
        int amount = resourceUnit.amount
        long startTimeMs = System.currentTimeMillis()
        ResourceRequest request = new ResourceRequest(resource:resourceUnit)
        synchronized(resourceRequests) {
            // Two scenarios:
            //
            //  - we are the last thread to arrive here
            //  - we are not the last
            //
            // If we are the last, it is our job to distribute the resources
            // and then signal the others to continue
            int numBidders = registeredResourceRequestors.count { it.bidding }
            
            int auctionThreshold = Math.min(numBidders, Config.config.maxThreads-1)
            
            log.info "There are ${registeredResourceRequestors.size()} registered bidders with $numBidders currently bidding, will distribute resources at $auctionThreshold"

            resourceRequests.add(request)
            while(true) {
               tryAuction(resource, startTimeMs) 
                if(request.allocated == null) {
                    log.info "Thread ${Thread.currentThread().id} waiting for resource allocation ($resourceUnit)"
                }
                else {
                    log.info "Thread ${Thread.currentThread().id} allocated $request.allocated resources after " + (System.currentTimeMillis() - startTimeMs) + " ms"
                    amount = request.allocated.amount
                    break
                }
                resourceRequests.wait(AUCTION_ATTENDEE_POLL_PERIOD)
           }
        }
        return amount
    }
    
    /**
     * Check if it is time to run an auction for resources on the given resource
     * If so, run the auction and return true, otherwise return false.
     * <p>
     * NOTE: thread safety oon this code relies on it only being accessed by a thread that 
     *       holds a monitor on the resourceRequests list.
     * 
     * @param resource
     * @param startTimeMs
     * @return  true if an auction was held
     */
    boolean tryAuction(Semaphore resource, long startTimeMs) {
        int maxThreads = Config.config.maxThreads
        int numBidders = registeredResourceRequestors.count { it.bidding }
        int auctionThreshold = Math.min(numBidders, maxThreads)
        
        if(resourceRequests.size() >= auctionThreshold) { 
            log.info "Taking over resource allocation because requested/bidders = ${resourceRequests.size()} / min(numBidders=$numBidders,$maxThreads)=$auctionThreshold"
            this.allocateResources(resource)
            return true
        }        
        
        if(System.currentTimeMillis()-startTimeMs > AUCTION_TIMEOUT_MS) {
            log.info "Assuming over resource allocation / delayed bidders because requested/bidders = ${resourceRequests.size()} / min(numBidders=$numBidders,$maxThreads)=$auctionThreshold"
            this.allocateResources(resource)
            return true
        }
        
        return false
    }
   
    /**
     * Distribute available resources to the current list of requestors,
     * then clear the list and notify all participants via notifyAll() on the 
     * resourceRequests list.
     * <p>
     * The current algorithm for allocating resources is quite simple. It works by first
     * distributing the minimum resources requested by all the participants to each 
     * participant. Then the remaining available resources are distributed evenly 
     * to all participants to "top them up" with more resources where variable
     * amounts of resource were requested.
     * <p>
     * NOTE: thread safety oon this code relies on it only being accessed by a thread that 
     *       holds a monitor on the resourceRequests list.
     */
     void allocateResources(Semaphore resource) {
       
       // Start by trying to allocate the minimum resources to everyone, if possible
       resourceRequests.each { r->
           r.allocated = new ResourceUnit(key: r.resource.key, amount: r.resource.amount)
           if(r.resource.amount != ResourceUnit.UNLIMITED && r.resource.maxAmount == 0 ) // not a dynamic allocation
               r.allocated.amount = r.resource.amount
           else { // dynamic allocation: seed the value with 1 and we will top up with more after
               if(r.resource.amount == ResourceUnit.UNLIMITED)
                   r.allocated.amount = 1 
           }
       }
       
       log.info "First pass allocations are " + resourceRequests*.allocated*.amount
       
       // Then divide up the remainder of the free resources to the ones that are unlimited
       int freeResources = resource.availablePermits() - (resourceRequests.sum { it.allocated.amount }?:0)
       List<ResourceRequestor> unlimitedRequestors = resourceRequests.grep { it.resource.amount == ResourceUnit.UNLIMITED || it.resource.maxAmount }
       if(freeResources > 0 && unlimitedRequestors) {
           
           int perRequestorAmount = Math.floor(freeResources / unlimitedRequestors.size())
           
           log.info "Dividing up $freeResources free resource permits among ${unlimitedRequestors.size()} requestors = $perRequestorAmount"
           
           unlimitedRequestors.eachWithIndex { ResourceRequest r, int i ->
               
               int myMax = r.resource.maxAmount
               if(r.resource.amount == ResourceUnit.UNLIMITED) {
                   // Note: default value for max_per_command_threads is set in the default bpipe.config,
                   // so unless the user explicitly overrode it, it will always be non-null
                   if(Config.userConfig.max_per_command_threads != null) {
                       myMax = Config.userConfig.max_per_command_threads.toInteger()
                   }
                   else
                       myMax = 0
               }
               
               r.allocated.maxAmount = myMax
               
               int myAmount = perRequestorAmount 
               
               // Would this amount exceed the max amount set for this resource?
               if((myMax > 0) &&  (r.allocated.amount + perRequestorAmount > myMax)) {
                   // It would exceed the max amount: just allocate the difference to bring us up to the max amount
                   myAmount = Math.max(0, myMax - r.allocated.amount )
                   log.info "$perRequestorAmount is too much for requestor $i: allocating $myAmount "
               }
               
               r.allocated.amount += myAmount
               freeResources -= myAmount
           }
           
           unlimitedRequestors.eachWithIndex { r, i ->
               if(freeResources>0 && (r.allocated.maxAmount == 0 || r.allocated.amount < r.allocated.maxAmount)) {
                   log.info "Bonus free resource to unlimited requestor $i"
                   r.allocated.amount ++
                   freeResources --
               }
           }
       }
       
       resourceRequests.notifyAll()
       
       resourceRequests.clear()
   }
   
   void release(ResourceUnit resourceUnit) {
        Semaphore resource
        synchronized(resourceAllocations) {
            resource = resourceAllocations.get(resourceUnit.key)
        }
        
        if(resource == null) {
            log.info "Unknown resource type $resourceUnit.key specified: treating as infinite resource"
            return
        }
        
       resource.release(resourceUnit.amount)
       log.info "Thread " + Thread.currentThread().id + " releasing $resourceUnit.amount $resourceUnit.key"
   }
   
   void setLimit(String resourceName, String value) {
       if(resourceName == "memory") {
           ResourceUnit memoryResource = ResourceUnit.memory(value)
           setLimit("memory", memoryResource.amount)
       }
       else {
           if(value.isInteger())
               setLimit(resourceName, value.toInteger())
           else 
               throw new PipelineError("Resource ${resourceName} limit could not be parsed as an integer and is not a known non-integer resource type")
       }
   }
   
   
   void setLimit(String resourceName, int amount) {
       this.resourceAllocations.put(resourceName, new Semaphore(amount))
   }
   
   void initFromConfig(boolean override=true) {
       
       if(!Config.userConfig.limits) 
           return
       
       Config.userConfig.limits.each { key, value ->
           log.info "Setting limit $key with value $value from user configuration"
           if(override || !resourceAllocations.containsKey(key))
               setLimit(key, value)
       }
   }
   
   
   @CompileStatic
   static Concurrency getTheInstance() {
       return Concurrency.instance
   }
}
